// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

#include "common/errno.h"

#include "rgw_common.h"
#include "rgw_coroutine.h"
#include "rgw_sync_module.h"
#include "rgw_data_sync.h"
#include "rgw_sync_module_aws.h"
#include "rgw_cr_rados.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rest.h"
#include "rgw_acl.h"
#include "rgw_zone.h"

#include "services/svc_zone.h"

#include <boost/asio/yield.hpp>

#define dout_subsys ceph_subsys_rgw


#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)

using namespace std;

static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}";

static string get_key_oid(const rgw_obj_key& key)
{
  string oid = key.name;
  if (!key.instance.empty() &&
      !key.have_null_instance()) {
    oid += string(":") + key.instance;
  }
  return oid;
}

static string obj_to_aws_path(const rgw_obj& obj)
{
  return obj.bucket.name + "/" + get_key_oid(obj.key);
}

/*

   json configuration definition:

    {
      "connection": {
        "access_key": <access>,
        "secret": <secret>,
        "endpoint": <endpoint>,
        "host_style": <path | virtual>,
      },
      "acls": [ { "type": <id | email | uri>,
                  "source_id": <source_id>,
                  "dest_id": <dest_id> } ... ],  # optional, acl mappings, no mappings if does not exist
      "target_path": <target_path>, # override default
           

      # anything below here is for non trivial configuration 
      # can be used in conjuction with the above

      "default": {
        "connection": {
            "access_key": <access>,
            "secret": <secret>,
            "endpoint": <endpoint>,
            "host_style" <path | virtual>,
        },
        "acls": [    # list of source uids and how they map into destination uids in the dest objects acls
        {
          "type" : <id | email | uri>,   #  optional, default is id
          "source_id": <id>,
          "dest_id": <id>
        } ... ]
        "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
                                               # final object name will be target_path + "/" + obj
      },
      "connections": [
          {
            "id": <id>,
            "access_key": <access>,
            "secret": <secret>,
            "endpoint": <endpoint>,
          } ... ],
      "acl_profiles": [
          {
            "id": <id>, # acl mappings
            "acls": [ {
                "type": <id | email | uri>,
                "source_id": <id>,
                "dest_id": <id>
              } ... ]
          }
      ],
      "profiles": [
          {
           "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
           "target_path": <dest>,   # (override default)
           "connection_id": <connection_id>, # optional, if empty references default connection
           "acls_id": <mappings_id>, # optional, if empty references default mappings
          } ... ],
    }

target path optional variables:

(evaluated at init)
sid: sync instance id, randomly generated by sync process on first sync initalization
zonegroup: zonegroup name
zonegroup_id: zonegroup name
zone: zone name
zone_id: zone name

(evaluated when syncing)
bucket: bucket name
owner: bucket owner

*/

struct ACLMapping {
  ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER};
  string source_id;
  string dest_id;

  ACLMapping() = default;

  ACLMapping(ACLGranteeTypeEnum t,
             const string& s,
             const string& d) : type(t),
  source_id(s),
  dest_id(d) {}

  void init(const JSONFormattable& config) {
    const string& t = config["type"];

    if (t == "email") {
      type = ACL_TYPE_EMAIL_USER;
    } else if (t == "uri") {
      type = ACL_TYPE_GROUP;
    } else {
      type = ACL_TYPE_CANON_USER;
    }

    source_id = config["source_id"];
    dest_id = config["dest_id"];
  }

  void dump_conf(CephContext *cct, JSONFormatter& jf) const {
    Formatter::ObjectSection os(jf, "acl_mapping");
    string s;
    switch (type) {
      case ACL_TYPE_EMAIL_USER:
        s = "email";
        break;
      case ACL_TYPE_GROUP:
        s = "uri";
        break;
      default:
        s = "id";
        break;
    }
    encode_json("type", s, &jf);
    encode_json("source_id", source_id, &jf);
    encode_json("dest_id", dest_id, &jf);
  }
};

struct ACLMappings {
  map<string, ACLMapping> acl_mappings;

  void init(const JSONFormattable& config) {
    for (auto& c : config.array()) {
      ACLMapping m;
      m.init(c);

      acl_mappings.emplace(std::make_pair(m.source_id, m));
    }
  }
  void dump_conf(CephContext *cct, JSONFormatter& jf) const {
    Formatter::ArraySection os(jf, "acls");

    for (auto& i : acl_mappings) {
      i.second.dump_conf(cct, jf);
    }
  }
};

struct AWSSyncConfig_ACLProfiles {
  map<string, std::shared_ptr<ACLMappings> > acl_profiles;

  void init(const JSONFormattable& config) {
    for (auto& c : config.array()) {
      const string& profile_id = c["id"];

      std::shared_ptr<ACLMappings> ap{new ACLMappings};
      ap->init(c["acls"]);

      acl_profiles[profile_id] = ap;
    }
  }
  void dump_conf(CephContext *cct, JSONFormatter& jf) const {
    Formatter::ArraySection section(jf, "acl_profiles");

    for (auto& p : acl_profiles) {
      Formatter::ObjectSection section(jf, "profile");
      encode_json("id", p.first, &jf);
      p.second->dump_conf(cct, jf);
    }
  }

  bool find(const string& profile_id, ACLMappings *result) const {
    auto iter = acl_profiles.find(profile_id);
    if (iter == acl_profiles.end()) {
      return false;
    }
    *result = *iter->second;
    return true;
  }
};

struct AWSSyncConfig_Connection {
  string connection_id;
  string endpoint;
  RGWAccessKey key;
  std::optional<string> region;
  HostStyle host_style{PathStyle};

  bool has_endpoint{false};
  bool has_key{false};
  bool has_host_style{false};

  void init(const JSONFormattable& config) {
    has_endpoint = config.exists("endpoint");
    has_key = config.exists("access_key") || config.exists("secret");
    has_host_style = config.exists("host_style");

    connection_id = config["id"];
    endpoint = config["endpoint"];

    key = RGWAccessKey(config["access_key"], config["secret"]);

    if (config.exists("region")) {
      region = config["region"];
    } else {
      region.reset();
    }

    string host_style_str = config["host_style"];
    if (host_style_str != "virtual") {
      host_style = PathStyle;
    } else {
      host_style = VirtualStyle;
    }
  }
  void dump_conf(CephContext *cct, JSONFormatter& jf) const {
    Formatter::ObjectSection section(jf, "connection");
    encode_json("id", connection_id, &jf);
    encode_json("endpoint", endpoint, &jf);
    string s = (host_style == PathStyle ? "path" : "virtual");
    encode_json("region", region, &jf);
    encode_json("host_style", s, &jf);

    {
      Formatter::ObjectSection os(jf, "key");
      encode_json("access_key", key.id, &jf);
      string secret = (key.key.empty() ? "" : "******");
      encode_json("secret", secret, &jf);
    }
  }
};

static int conf_to_uint64(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
{
  string sval;
  if (config.find(key, &sval)) {
    string err;
    uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
    if (!err.empty()) {
      ldpp_dout(dpp, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
      return -EINVAL;
    }
    *pval = val;
  }
  return 0;
}

struct AWSSyncConfig_S3 {
  uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
  uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};

  int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
    int r = conf_to_uint64(dpp, cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
    if (r < 0) {
      return r;
    }

    r = conf_to_uint64(dpp, cct, config, "multipart_min_part_size", &multipart_min_part_size);
    if (r < 0) {
      return r;
    }
#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
    if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
      multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
    }
    return 0;
  }

  void dump_conf(CephContext *cct, JSONFormatter& jf) const {
    Formatter::ObjectSection section(jf, "s3");
    encode_json("multipart_sync_threshold", multipart_sync_threshold, &jf);
    encode_json("multipart_min_part_size", multipart_min_part_size, &jf);
  }
};

struct AWSSyncConfig_Profile {
  string source_bucket;
  bool prefix{false};
  string target_path;
  string connection_id;
  string acls_id;

  std::shared_ptr<AWSSyncConfig_Connection> conn_conf;
  std::shared_ptr<ACLMappings> acls;

  std::shared_ptr<RGWRESTConn> conn;

  void init(const JSONFormattable& config) {
    source_bucket = config["source_bucket"];

    prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*');

    if (prefix) {
      source_bucket = source_bucket.substr(0, source_bucket.size() - 1);
    }

    target_path = config["target_path"];
    connection_id = config["connection_id"];
    acls_id = config["acls_id"];

    if (config.exists("connection")) {
      conn_conf = make_shared<AWSSyncConfig_Connection>();
      conn_conf->init(config["connection"]);
    }

    if (config.exists("acls")) {
      acls = make_shared<ACLMappings>();
      acls->init(config["acls"]);
    }
  }

  void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const {
    Formatter::ObjectSection config(jf, section);
    string sb{source_bucket};
    if (prefix) {
      sb.append("*");
    }
    encode_json("source_bucket", sb, &jf);
    encode_json("target_path", target_path, &jf);
    encode_json("connection_id", connection_id, &jf);
    encode_json("acls_id", acls_id, &jf);
    if (conn_conf.get()) {
      conn_conf->dump_conf(cct, jf);
    }
    if (acls.get()) {
      acls->dump_conf(cct, jf);
    }
  }
};

static void find_and_replace(const string& src, const string& find, const string& replace, string *dest)
{
  string s = src;

  size_t pos = s.find(find);
  while (pos != string::npos) {
    size_t next_ofs = pos + find.size();
    s = s.substr(0, pos) + replace + s.substr(next_ofs);
    pos = s.find(find, next_ofs);
  }

  *dest = s;
}

static void apply_meta_param(const string& src, const string& param, const string& val, string *dest)
{
  string s = string("${") + param + "}";
  find_and_replace(src, s, val, dest);
}


struct AWSSyncConfig {
  AWSSyncConfig_Profile default_profile;
  std::shared_ptr<AWSSyncConfig_Profile> root_profile;

  map<string, std::shared_ptr<AWSSyncConfig_Connection> > connections;
  AWSSyncConfig_ACLProfiles acl_profiles;

  map<string, std::shared_ptr<AWSSyncConfig_Profile> > explicit_profiles;

  AWSSyncConfig_S3 s3;

  int init_profile(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
                   bool connection_must_exist) {
    if (!profile.connection_id.empty()) {
      if (profile.conn_conf) {
        ldpp_dout(dpp, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
        return -EINVAL;
      }
      if (connections.find(profile.connection_id) == connections.end()) {
        ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
        return -EINVAL;
      }
      profile.conn_conf = connections[profile.connection_id];
    } else if (!profile.conn_conf) {
      profile.connection_id = default_profile.connection_id;
      auto i = connections.find(profile.connection_id);
      if (i != connections.end()) {
        profile.conn_conf = i->second;
      }
    }

    if (connection_must_exist && !profile.conn_conf) {
      ldpp_dout(dpp, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
      return -EINVAL;
    }

    if (profile.conn_conf && default_profile.conn_conf) {
      if (!profile.conn_conf->has_endpoint) {
        profile.conn_conf->endpoint = default_profile.conn_conf->endpoint;
      }
      if (!profile.conn_conf->has_host_style) {
        profile.conn_conf->host_style = default_profile.conn_conf->host_style;
      }
      if (!profile.conn_conf->has_key) {
        profile.conn_conf->key = default_profile.conn_conf->key;
      }
    }

    ACLMappings acl_mappings;

    if (!profile.acls_id.empty()) {
      if (!acl_profiles.find(profile.acls_id, &acl_mappings)) {
        ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
        return -EINVAL;
      }
      profile.acls = acl_profiles.acl_profiles[profile.acls_id];
    } else if (!profile.acls) {
      if (default_profile.acls) {
        profile.acls = default_profile.acls;
        profile.acls_id = default_profile.acls_id;
      }
    }

    if (profile.target_path.empty()) {
      profile.target_path = default_profile.target_path;
    }
    if (profile.target_path.empty()) {
      profile.target_path = default_target_path;
    }

    return 0;
  }

  int init_target(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
    std::shared_ptr<AWSSyncConfig_Profile> profile;
    profile.reset(new AWSSyncConfig_Profile);
    profile->init(profile_conf);

    int ret = init_profile(dpp, cct, profile_conf, *profile, true);
    if (ret < 0) {
      return ret;
    }

    auto& sb = profile->source_bucket;

    if (explicit_profiles.find(sb) != explicit_profiles.end()) {
      ldpp_dout(dpp, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
    }

    explicit_profiles[sb] = profile;
    if (ptarget) {
      *ptarget = profile;
    }
    return 0;
  }

  bool do_find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
    const string& name = bucket.name;
    auto iter = explicit_profiles.upper_bound(name);
    if (iter == explicit_profiles.begin()) {
      return false;
    }

    --iter;
    if (iter->first.size() > name.size()) {
      return false;
    }
    if (name.compare(0, iter->first.size(), iter->first) != 0) {
      return false;
    }

    std::shared_ptr<AWSSyncConfig_Profile>& target = iter->second;

    if (!target->prefix &&
        name.size() != iter->first.size()) {
      return false;
    }

    *result = target;
    return true;
  }

  void find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
    if (!do_find_profile(bucket, result)) {
      *result = root_profile;
    }
  }

  AWSSyncConfig() {}

  int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
    auto& default_conf = config["default"];

    if (config.exists("default")) {
      default_profile.init(default_conf);
      init_profile(dpp, cct, default_conf, default_profile, false);
    }

    for (auto& conn : config["connections"].array()) {
      auto new_conn = conn;

      std::shared_ptr<AWSSyncConfig_Connection> c{new AWSSyncConfig_Connection};
      c->init(new_conn);

      connections[new_conn["id"]] = c;
    }

    acl_profiles.init(config["acl_profiles"]);

    int r = s3.init(dpp, cct, config["s3"]);
    if (r < 0) {
      return r;
    }

    auto new_root_conf = config;

    r = init_target(dpp, cct, new_root_conf, &root_profile); /* the root profile config */
    if (r < 0) {
      return r;
    }

    for (auto target_conf : config["profiles"].array()) {
      int r = init_target(dpp, cct, target_conf, nullptr);
      if (r < 0) {
        return r;
      }
    }

    JSONFormatter jf(true);
    dump_conf(cct, jf);
    stringstream ss;
    jf.flush(ss);

    ldpp_dout(dpp, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl;

    return 0;
  }

  void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) {
      apply_meta_param(path, "sid", sid, dest);

      const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup();
      apply_meta_param(path, "zonegroup", zg.get_name(), dest);
      apply_meta_param(path, "zonegroup_id", zg.get_id(), dest);

      const RGWZone& zone = sc->env->svc->zone->get_zone();
      apply_meta_param(path, "zone", zone.name, dest);
      apply_meta_param(path, "zone_id", zone.id, dest);
  }

  void update_config(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, const string& sid) {
    expand_target(sc, sid, root_profile->target_path, &root_profile->target_path);
    ldpp_dout(dpp, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
    for (auto& t : explicit_profiles) {
      expand_target(sc, sid, t.second->target_path, &t.second->target_path);
      ldpp_dout(dpp, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
    }
  }

  void dump_conf(CephContext *cct, JSONFormatter& jf) const {
    Formatter::ObjectSection config(jf, "config");
    root_profile->dump_conf(cct, jf);
    jf.open_array_section("connections");
    for (auto c : connections) {
      c.second->dump_conf(cct, jf);
    }
    jf.close_section();

    acl_profiles.dump_conf(cct, jf);

    { // targets
      Formatter::ArraySection as(jf, "profiles");
      for (auto& t : explicit_profiles) {
        Formatter::ObjectSection target_section(jf, "profile");
        encode_json("name", t.first, &jf);
        t.second->dump_conf(cct, jf);
      }
    }
  }

  string get_path(std::shared_ptr<AWSSyncConfig_Profile>& profile,
                  const RGWBucketInfo& bucket_info,
                  const rgw_obj_key& obj) {
    string bucket_str;
    string owner;
    if (!bucket_info.owner.tenant.empty()) {
      bucket_str = owner = bucket_info.owner.tenant + "-";
      owner += bucket_info.owner.id;
    }
    bucket_str += bucket_info.bucket.name;

    const string& path = profile->target_path;

    string new_path;
    apply_meta_param(path, "bucket", bucket_str, &new_path);
    apply_meta_param(new_path, "owner", owner, &new_path);

    new_path += string("/") + get_key_oid(obj);

    return new_path;
  }

  void get_target(std::shared_ptr<AWSSyncConfig_Profile>& profile,
                  const RGWBucketInfo& bucket_info,
                  const rgw_obj_key& obj,
                  string *bucket_name,
                  string *obj_name) {
    string path = get_path(profile, bucket_info, obj);
    size_t pos = path.find('/');

    *bucket_name = path.substr(0, pos);
    *obj_name = path.substr(pos + 1);
  }

  void init_conns(RGWDataSyncCtx *sc, const string& id) {
    auto sync_env = sc->env;

    update_config(sync_env->dpp, sc, id);

    auto& root_conf = root_profile->conn_conf;

    root_profile->conn.reset(new S3RESTConn(sc->cct,
                                           id,
                                           { root_conf->endpoint },
                                           root_conf->key,
					   sync_env->svc->zone->get_zonegroup().get_id(),
                                           root_conf->region,
                                           root_conf->host_style));

    for (auto i : explicit_profiles) {
      auto& c = i.second;

      c->conn.reset(new S3RESTConn(sc->cct,
                                   id,
                                   { c->conn_conf->endpoint },
                                   c->conn_conf->key,
				   sync_env->svc->zone->get_zonegroup().get_id(),
                                   c->conn_conf->region,
                                   c->conn_conf->host_style));
    }
  }
};


struct AWSSyncInstanceEnv {
  AWSSyncConfig conf;
  string id;

  explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {}

  void init(RGWDataSyncCtx *sc, uint64_t instance_id) {
    char buf[32];
    snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id);
    id = buf;

    conf.init_conns(sc, id);
  }

  void get_profile(const rgw_bucket& bucket, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
    conf.find_profile(bucket, ptarget);
    ceph_assert(ptarget);
  }
};

static int do_decode_rest_obj(const DoutPrefixProvider *dpp, CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
{
  for (auto header : headers) {
    const string& val = header.second;
    if (header.first == "RGWX_OBJECT_SIZE") {
      info->content_len = atoi(val.c_str());
    } else {
      info->attrs[header.first] = val;
    }
  }

  info->acls.set_ctx(cct);
  auto aiter = attrs.find(RGW_ATTR_ACL);
  if (aiter != attrs.end()) {
    bufferlist& bl = aiter->second;
    auto bliter = bl.cbegin();
    try {
      info->acls.decode(bliter);
    } catch (buffer::error& err) {
      ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl;
      return -EIO;
    }
  } else {
    ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl;
  }

  return 0;
}

class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
{
  RGWDataSyncCtx *sc;
  RGWRESTConn *conn;
  const rgw_obj& src_obj;
  RGWRESTConn::get_obj_params req_params;

  rgw_sync_aws_src_obj_properties src_properties;
public:
  RGWRESTStreamGetCRF(CephContext *_cct,
                               RGWCoroutinesEnv *_env,
                               RGWCoroutine *_caller,
                               RGWDataSyncCtx *_sc,
                               RGWRESTConn *_conn,
                               const rgw_obj& _src_obj,
                               const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller,
                                                                                                                      _sc->env->http_manager, _src_obj.key),
                                                                                 sc(_sc), conn(_conn), src_obj(_src_obj),
                                                                                 src_properties(_src_properties) {
  }

  int init(const DoutPrefixProvider *dpp) override {
    /* init input connection */


    req_params.get_op = true;
    req_params.prepend_metadata = true;

    req_params.unmod_ptr = &src_properties.mtime;
    req_params.etag = src_properties.etag;
    req_params.mod_zone_id = src_properties.zone_short_id;
    req_params.mod_pg_ver = src_properties.pg_ver;

    if (range.is_set) {
      req_params.range_is_set = true;
      req_params.range_start = range.ofs;
      req_params.range_end = range.ofs + range.size - 1;
    }

    RGWRESTStreamRWRequest *in_req;
    int ret = conn->get_obj(dpp, src_obj, req_params, false /* send */, &in_req);
    if (ret < 0) {
      ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
      return ret;
    }

    set_req(in_req);

    return RGWStreamReadHTTPResourceCRF::init(dpp);
  }

  int decode_rest_obj(const DoutPrefixProvider *dpp, map<string, string>& headers, bufferlist& extra_data) override {
    map<string, bufferlist> src_attrs;

    ldpp_dout(dpp, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;

    if (extra_data.length() > 0) {
      JSONParser jp;
      if (!jp.parse(extra_data.c_str(), extra_data.length())) {
        ldpp_dout(dpp, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
        return -EIO;
      }

      JSONDecoder::decode_json("attrs", src_attrs, &jp);
    }
    return do_decode_rest_obj(dpp, sc->cct, src_attrs, headers, &rest_obj);
  }

  bool need_extra_data() override {
    return true;
  }
};

static std::set<string> keep_headers = { "CONTENT_TYPE",
                                         "CONTENT_ENCODING",
                                         "CONTENT_DISPOSITION",
                                         "CONTENT_LANGUAGE" };

class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
{
  RGWDataSyncCtx *sc;
  rgw_sync_aws_src_obj_properties src_properties;
  std::shared_ptr<AWSSyncConfig_Profile> target;
  const rgw_obj& dest_obj;
  string etag;
public:
  RGWAWSStreamPutCRF(CephContext *_cct,
                               RGWCoroutinesEnv *_env,
                               RGWCoroutine *_caller,
                               RGWDataSyncCtx *_sc,
                               const rgw_sync_aws_src_obj_properties&  _src_properties,
                               std::shared_ptr<AWSSyncConfig_Profile>& _target,
                               const rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager),
                                                     sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) {
  }

  int init() override {
    /* init output connection */
    RGWRESTStreamS3PutObj *out_req{nullptr};

    if (multipart.is_multipart) {
      char buf[32];
      snprintf(buf, sizeof(buf), "%d", multipart.part_num);
      rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
                                       { "partNumber", buf },
                                       { nullptr, nullptr } };
      target->conn->put_obj_send_init(dest_obj, params, &out_req);
    } else {
      target->conn->put_obj_send_init(dest_obj, nullptr, &out_req);
    }

    set_req(out_req);

    return RGWStreamWriteHTTPResourceCRF::init();
  }

  static bool keep_attr(const string& h) {
    return (keep_headers.find(h) != keep_headers.end() ||
            boost::algorithm::starts_with(h, "X_AMZ_"));
  }

  static void init_send_attrs(const DoutPrefixProvider *dpp,
                              CephContext *cct,
                              const rgw_rest_obj& rest_obj,
                              const rgw_sync_aws_src_obj_properties& src_properties,
                              const AWSSyncConfig_Profile *target,
                              map<string, string> *attrs) {
    auto& new_attrs = *attrs;

    new_attrs.clear();

    for (auto& hi : rest_obj.attrs) {
      if (keep_attr(hi.first)) {
        new_attrs.insert(hi);
      }
    }

    auto acl = rest_obj.acls.get_acl();

    map<int, vector<string> > access_map;

    if (target->acls) {
      for (auto& grant : acl.get_grant_map()) {
        auto& orig_grantee = grant.first;
        auto& perm = grant.second;

        string grantee;

        const auto& am = target->acls->acl_mappings;

        auto iter = am.find(orig_grantee);
        if (iter == am.end()) {
          ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
          continue;
        }

        grantee = iter->second.dest_id;

        string type;

        switch (iter->second.type) {
          case ACL_TYPE_CANON_USER:
            type = "id";
            break;
          case ACL_TYPE_EMAIL_USER:
            type = "emailAddress";
            break;
          case ACL_TYPE_GROUP:
            type = "uri";
            break;
          default:
            continue;
        }

        string tv = type + "=" + grantee;

        int flags = perm.get_permission().get_permissions();
        if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
          access_map[flags].push_back(tv);
          continue;
        }

        for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
          if (flags & i) {
            access_map[i].push_back(tv);
          }
        }
      }
    }

    for (auto aiter : access_map) {
      int grant_type = aiter.first;

      string header_str("x-amz-grant-");

      switch (grant_type) {
        case RGW_PERM_READ:
          header_str.append("read");
          break;
        case RGW_PERM_WRITE:
          header_str.append("write");
          break;
        case RGW_PERM_READ_ACP:
          header_str.append("read-acp");
          break;
        case RGW_PERM_WRITE_ACP:
          header_str.append("write-acp");
          break;
        case RGW_PERM_FULL_CONTROL:
          header_str.append("full-control");
          break;
      }

      string s;

      for (auto viter : aiter.second) {
        if (!s.empty()) {
          s.append(", ");
        }
        s.append(viter);
      }

      ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;

      new_attrs[header_str] = s;
    }

    char buf[32];
    snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch);
    new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;

    utime_t ut(src_properties.mtime);
    snprintf(buf, sizeof(buf), "%lld.%09lld",
             (long long)ut.sec(),
             (long long)ut.nsec());

    new_attrs["x-amz-meta-rgwx-source-mtime"] = buf;
    new_attrs["x-amz-meta-rgwx-source-etag"] = src_properties.etag;
    new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
    if (!rest_obj.key.instance.empty()) {
      new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
    }
  }

  void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override {
    RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);

    map<string, string> new_attrs;
    if (!multipart.is_multipart) {
      init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
    }

    r->set_send_length(rest_obj.content_len);

    RGWAccessControlPolicy policy;

    r->send_ready(dpp, target->conn->get_key(), new_attrs, policy);
  }

  void handle_headers(const map<string, string>& headers) {
    for (auto h : headers) {
      if (h.first == "ETAG") {
        etag = h.second;
      }
    }
  }

  bool get_etag(string *petag) {
    if (etag.empty()) {
      return false;
    }
    *petag = etag;
    return true;
  }
};


class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  RGWRESTConn *source_conn;
  std::shared_ptr<AWSSyncConfig_Profile> target;
  const rgw_obj& src_obj;
  const rgw_obj& dest_obj;

  rgw_sync_aws_src_obj_properties src_properties;

  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;

public:
  RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc,
                                RGWRESTConn *_source_conn,
                                const rgw_obj& _src_obj,
                                const rgw_sync_aws_src_obj_properties& _src_properties,
                                std::shared_ptr<AWSSyncConfig_Profile> _target,
                                const rgw_obj& _dest_obj) : RGWCoroutine(_sc->cct),
                                                   sc(_sc),
                                                   source_conn(_source_conn),
                                                   target(_target),
                                                   src_obj(_src_obj),
                                                   dest_obj(_dest_obj),
                                                   src_properties(_src_properties) {}

  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {
      /* init input */
      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
                                           source_conn, src_obj,
                                           src_properties));

      /* init output */
      out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
                                           src_properties, target, dest_obj));

      yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
      if (retcode < 0) {
        return set_cr_error(retcode);
      }

      return set_cr_done();
    }

    return 0;
  }
};

class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  RGWRESTConn *source_conn;
  std::shared_ptr<AWSSyncConfig_Profile> target;
  const rgw_obj& src_obj;
  const rgw_obj& dest_obj;

  rgw_sync_aws_src_obj_properties src_properties;

  string upload_id;

  rgw_sync_aws_multipart_part_info part_info;

  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;

  string *petag;

public:
  RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc,
                                RGWRESTConn *_source_conn,
                                const rgw_obj& _src_obj,
                                std::shared_ptr<AWSSyncConfig_Profile>& _target,
                                const rgw_obj& _dest_obj,
                                const rgw_sync_aws_src_obj_properties& _src_properties,
                                const string& _upload_id,
                                const rgw_sync_aws_multipart_part_info& _part_info,
                                string *_petag) : RGWCoroutine(_sc->cct),
                                                   sc(_sc),
                                                   source_conn(_source_conn),
                                                   target(_target),
                                                   src_obj(_src_obj),
                                                   dest_obj(_dest_obj),
                                                   src_properties(_src_properties),
                                                   upload_id(_upload_id),
                                                   part_info(_part_info),
                                                   petag(_petag) {}

  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {
      /* init input */
      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
                                           source_conn, src_obj,
                                           src_properties));

      in_crf->set_range(part_info.ofs, part_info.size);

      /* init output */
      out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
                                           src_properties, target, dest_obj));

      out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);

      yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
      if (retcode < 0) {
        return set_cr_error(retcode);
      }

      if (!(static_cast<RGWAWSStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
        ldpp_dout(dpp, 0) << "ERROR: failed to get etag from PUT request" << dendl;
        return set_cr_error(-EIO);
      }

      return set_cr_done();
    }

    return 0;
  }
};

class RGWAWSAbortMultipartCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  RGWRESTConn *dest_conn;
  const rgw_obj& dest_obj;

  string upload_id;

public:
  RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc,
                        RGWRESTConn *_dest_conn,
                        const rgw_obj& _dest_obj,
                        const string& _upload_id) : RGWCoroutine(_sc->cct),
                                                   sc(_sc),
                                                   dest_conn(_dest_conn),
                                                   dest_obj(_dest_obj),
                                                   upload_id(_upload_id) {}

  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {

      yield {
        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
        bufferlist bl;
        call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager,
                                         obj_to_aws_path(dest_obj), params));
      }

      if (retcode < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
        return set_cr_error(retcode);
      }

      return set_cr_done();
    }

    return 0;
  }
};

class RGWAWSInitMultipartCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  RGWRESTConn *dest_conn;
  const rgw_obj& dest_obj;

  uint64_t obj_size;
  map<string, string> attrs;

  bufferlist out_bl;

  string *upload_id;

  struct InitMultipartResult {
    string bucket;
    string key;
    string upload_id;

    void decode_xml(XMLObj *obj) {
      RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
      RGWXMLDecoder::decode_xml("Key", key, obj);
      RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
    }
  } result;

public:
  RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc,
                        RGWRESTConn *_dest_conn,
                        const rgw_obj& _dest_obj,
                        uint64_t _obj_size,
                        const map<string, string>& _attrs,
                        string *_upload_id) : RGWCoroutine(_sc->cct),
                                                   sc(_sc),
                                                   dest_conn(_dest_conn),
                                                   dest_obj(_dest_obj),
                                                   obj_size(_obj_size),
                                                   attrs(_attrs),
                                                   upload_id(_upload_id) {}

  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {

      yield {
        rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
        bufferlist bl;
        call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
                                                 obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
      }

      if (retcode < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
        return set_cr_error(retcode);
      }
      {
        /*
         * If one of the following fails we cannot abort upload, as we cannot
         * extract the upload id. If one of these fail it's very likely that that's
         * the least of our problem.
         */
        RGWXMLDecoder::XMLParser parser;
        if (!parser.init()) {
          ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
          return set_cr_error(-EIO);
        }

        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
          string str(out_bl.c_str(), out_bl.length());
          ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
          return set_cr_error(-EIO);
        }

        try {
          RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
        } catch (RGWXMLDecoder::err& err) {
          string str(out_bl.c_str(), out_bl.length());
          ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
          return set_cr_error(-EIO);
        }
      }

      ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;

      *upload_id = result.upload_id;

      return set_cr_done();
    }

    return 0;
  }
};

class RGWAWSCompleteMultipartCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  RGWRESTConn *dest_conn;
  const rgw_obj& dest_obj;

  bufferlist out_bl;

  string upload_id;

  struct CompleteMultipartReq {
    map<int, rgw_sync_aws_multipart_part_info> parts;

    explicit CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {}

    void dump_xml(Formatter *f) const {
      for (auto p : parts) {
        f->open_object_section("Part");
        encode_xml("PartNumber", p.first, f);
        encode_xml("ETag", p.second.etag, f);
        f->close_section();
      };
    }
  } req_enc;

  struct CompleteMultipartResult {
    string location;
    string bucket;
    string key;
    string etag;

    void decode_xml(XMLObj *obj) {
      RGWXMLDecoder::decode_xml("Location", bucket, obj);
      RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
      RGWXMLDecoder::decode_xml("Key", key, obj);
      RGWXMLDecoder::decode_xml("ETag", etag, obj);
    }
  } result;

public:
  RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc,
                        RGWRESTConn *_dest_conn,
                        const rgw_obj& _dest_obj,
                        string _upload_id,
                        const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sc->cct),
                                                   sc(_sc),
                                                   dest_conn(_dest_conn),
                                                   dest_obj(_dest_obj),
                                                   upload_id(_upload_id),
                                                   req_enc(_parts) {}

  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {

      yield {
        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
        stringstream ss;
        XMLFormatter formatter;

        encode_xml("CompleteMultipartUpload", req_enc, &formatter);

        formatter.flush(ss);

        bufferlist bl;
        bl.append(ss.str());

        call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
                                                 obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
      }

      if (retcode < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
        return set_cr_error(retcode);
      }
      {
        /*
         * If one of the following fails we cannot abort upload, as we cannot
         * extract the upload id. If one of these fail it's very likely that that's
         * the least of our problem.
         */
        RGWXMLDecoder::XMLParser parser;
        if (!parser.init()) {
          ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
          return set_cr_error(-EIO);
        }

        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
          string str(out_bl.c_str(), out_bl.length());
          ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
          return set_cr_error(-EIO);
        }

        try {
          RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
        } catch (RGWXMLDecoder::err& err) {
          string str(out_bl.c_str(), out_bl.length());
          ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
          return set_cr_error(-EIO);
        }
      }

      ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;

      return set_cr_done();
    }

    return 0;
  }
};


class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  RGWRESTConn *dest_conn;
  const rgw_obj& dest_obj;
  const rgw_raw_obj status_obj;

  string upload_id;

public:

  RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc,
                                RGWRESTConn *_dest_conn,
                                const rgw_obj& _dest_obj,
                                const rgw_raw_obj& _status_obj,
                                const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc),
                                                            dest_conn(_dest_conn),
                                                            dest_obj(_dest_obj),
                                                            status_obj(_status_obj),
                                                            upload_id(_upload_id) {}

  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {
      yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id));
      if (retcode < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
        /* ignore error, best effort */
      }
      yield call(new RGWRadosRemoveCR(sc->env->driver, status_obj));
      if (retcode < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
        /* ignore error, best effort */
      }
      return set_cr_done();
    }

    return 0;
  }
};

class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  RGWDataSyncEnv *sync_env;
  AWSSyncConfig& conf;
  RGWRESTConn *source_conn;
  std::shared_ptr<AWSSyncConfig_Profile> target;
  const rgw_obj& src_obj;
  const rgw_obj& dest_obj;

  uint64_t obj_size;
  string src_etag;
  rgw_sync_aws_src_obj_properties src_properties;
  rgw_rest_obj rest_obj;

  rgw_sync_aws_multipart_upload_info status;

  map<string, string> new_attrs;

  rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};

  int ret_err{0};

  rgw_raw_obj status_obj;

public:
  RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc,
				    rgw_bucket_sync_pipe& _sync_pipe,
                                AWSSyncConfig& _conf,
                                RGWRESTConn *_source_conn,
                                const rgw_obj& _src_obj,
                                std::shared_ptr<AWSSyncConfig_Profile>& _target,
                                const rgw_obj& _dest_obj,
                                uint64_t _obj_size,
                                const rgw_sync_aws_src_obj_properties& _src_properties,
                                const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct),
                                                   sc(_sc),
                                                   sync_env(_sc->env),
                                                   conf(_conf),
                                                   source_conn(_source_conn),
                                                   target(_target),
                                                   src_obj(_src_obj),
                                                   dest_obj(_dest_obj),
                                                   obj_size(_obj_size),
                                                   src_properties(_src_properties),
                                                   rest_obj(_rest_obj),
                                                   status_obj(sync_env->svc->zone->get_zone_params().log_pool,
                                                              RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe, sc->source_zone, src_obj)) {
  }


  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {
      yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(
		   dpp, sync_env->driver, status_obj, &status, false));

      if (retcode < 0 && retcode != -ENOENT) {
        ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
        return retcode;
      }

      if (retcode >= 0) {
        /* check here that mtime and size did not change */

        if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
            status.src_properties.etag != src_properties.etag) {
          yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
          retcode = -ENOENT;
        }
      }

      if (retcode == -ENOENT) {
        RGWAWSStreamPutCRF::init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);

        yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id));
        if (retcode < 0) {
          return set_cr_error(retcode);
        }

        status.obj_size = obj_size;
        status.src_properties = src_properties;
#define MULTIPART_MAX_PARTS 10000
        uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
        status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size);
        status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
        status.cur_part = 1;
      }

      for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
        yield {
          rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part];
          cur_part_info.part_num = status.cur_part;
          cur_part_info.ofs = status.cur_ofs;
          cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);

          pcur_part_info = &cur_part_info;

          status.cur_ofs += status.part_size;

          call(new RGWAWSStreamObjToCloudMultipartPartCR(sc,
                                                             source_conn, src_obj,
                                                             target,
                                                             dest_obj,
                                                             status.src_properties,
                                                             status.upload_id,
                                                             cur_part_info,
                                                             &cur_part_info.etag));
        }

        if (retcode < 0) {
          ldpp_dout(dpp, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
          ret_err = retcode;
          yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
          return set_cr_error(ret_err);
        }

        yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->driver, status_obj, status));
        if (retcode < 0) {
          ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
          /* continue with upload anyway */
        }
        ldpp_dout(dpp, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
      }

      yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts));
      if (retcode < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
        ret_err = retcode;
        yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
        return set_cr_error(ret_err);
      }

      /* remove status obj */
      yield call(new RGWRadosRemoveCR(sync_env->driver, status_obj));
      if (retcode < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
        /* ignore error, best effort */
      }
      return set_cr_done();
    }

    return 0;
  }
};
template <class T>
int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val)
{
  map<string, bufferlist>::iterator iter = attrs.find(attr_name);
  if (iter == attrs.end()) {
    *result = def_val;
    return 0;
  }
  bufferlist& bl = iter->second;
  if (bl.length() == 0) {
    *result = def_val;
    return 0;
  }
  auto bliter = bl.cbegin();
  try {
    decode(*result, bliter);
  } catch (buffer::error& err) {
    return -EIO;
  }
  return 0;
}

// maybe use Fetch Remote Obj instead?
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
  rgw_bucket_sync_pipe sync_pipe;
  AWSSyncInstanceEnv& instance;

  uint64_t versioned_epoch{0};

  RGWRESTConn *source_conn{nullptr};
  std::shared_ptr<AWSSyncConfig_Profile> target;
  bufferlist res;
  unordered_map <string, bool> bucket_created;
  rgw_rest_obj rest_obj;
  int ret{0};

  uint32_t src_zone_short_id{0};
  uint64_t src_pg_ver{0};

  bufferlist out_bl;

  struct CreateBucketResult {
    string code;

    void decode_xml(XMLObj *obj) {
      RGWXMLDecoder::decode_xml("Code", code, obj);
    }
  } result;

  rgw_obj src_obj;
  rgw_obj dest_obj;

public:
  RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
                            rgw_bucket_sync_pipe& _sync_pipe,
                            rgw_obj_key& _key,
                            AWSSyncInstanceEnv& _instance,
                            uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
                                                         sync_pipe(_sync_pipe),
                                                         instance(_instance), versioned_epoch(_versioned_epoch)
  {}

  ~RGWAWSHandleRemoteObjCBCR(){
  }

  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {
      ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0);
      if (ret < 0) {
        ldpp_dout(dpp, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
      } else {
        ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0);
        if (ret < 0) {
          ldpp_dout(dpp, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl;
          src_pg_ver = 0; /* all or nothing */
        }
      }
      ldpp_dout(dpp, 4) << "AWS: download begin: z=" << sc->source_zone
                              << " b=" << src_bucket << " k=" << key << " size=" << size
                              << " mtime=" << mtime << " etag=" << etag
                              << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
                              << dendl;

      source_conn = sync_env->svc->zone->get_zone_conn(sc->source_zone);
      if (!source_conn) {
        ldpp_dout(dpp, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl;
        return set_cr_error(-EINVAL);
      }

      instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
      instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &dest_obj.bucket.name, &dest_obj.key.name);

      if (bucket_created.find(dest_obj.bucket.name) == bucket_created.end()){
        yield {
          ldpp_dout(dpp, 0) << "AWS: creating bucket " << dest_obj.bucket.name << dendl;
          bufferlist bl;
          call(new RGWPutRawRESTResourceCR <bufferlist> (sc->cct, target->conn.get(),
                                                  sync_env->http_manager,
                                                  dest_obj.bucket.name, nullptr, bl, &out_bl));
        }
        if (retcode < 0 ) {
          RGWXMLDecoder::XMLParser parser;
          if (!parser.init()) {
            ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
            return set_cr_error(retcode);
          }

          if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
            string str(out_bl.c_str(), out_bl.length());
            ldpp_dout(dpp, 5) << "ERROR: failed to parse xml: " << str << dendl;
            return set_cr_error(retcode);
          }

          try {
            RGWXMLDecoder::decode_xml("Error", result, &parser, true);
          } catch (RGWXMLDecoder::err& err) {
            string str(out_bl.c_str(), out_bl.length());
            ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
            return set_cr_error(retcode);
          }

          if (result.code != "BucketAlreadyOwnedByYou") {
            return set_cr_error(retcode);
          }
        }

        bucket_created[dest_obj.bucket.name] = true;
      }

      yield {
        src_obj.bucket = src_bucket;
        src_obj.key = key;

        /* init output */
        rgw_sync_aws_src_obj_properties src_properties;
        src_properties.mtime = mtime;
        src_properties.etag = etag;
        src_properties.zone_short_id = src_zone_short_id;
        src_properties.pg_ver = src_pg_ver;
        src_properties.versioned_epoch = versioned_epoch;

        if (size < instance.conf.s3.multipart_sync_threshold) {
          call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, src_obj,
                                                 src_properties,
                                                 target,
                                                 dest_obj));
        } else {
          rgw_rest_obj rest_obj;
          rest_obj.init(key);
          if (do_decode_rest_obj(dpp, sc->cct, attrs, headers, &rest_obj)) {
            ldpp_dout(dpp, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
            return set_cr_error(-EINVAL);
          }
          call(new RGWAWSStreamObjToCloudMultipartCR(sc, sync_pipe, instance.conf, source_conn, src_obj,
                                                     target, dest_obj, size, src_properties, rest_obj));
        }
      }
      if (retcode < 0) {
        return set_cr_error(retcode);
      }

      return set_cr_done();
    }

    return 0;
  }
};

class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
  rgw_bucket_sync_pipe sync_pipe;
  AWSSyncInstanceEnv& instance;
  uint64_t versioned_epoch;
public:
  RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
                              rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
                              AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
                                                          sync_pipe(_sync_pipe),
                                                          instance(_instance), versioned_epoch(_versioned_epoch) {
  }

  ~RGWAWSHandleRemoteObjCR() {}

  RGWStatRemoteObjCBCR *allocate_callback() override {
    return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch);
  }
};

class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
  RGWDataSyncCtx *sc;
  std::shared_ptr<AWSSyncConfig_Profile> target;
  rgw_bucket_sync_pipe sync_pipe;
  rgw_obj_key key;
  ceph::real_time mtime;
  AWSSyncInstanceEnv& instance;
  int ret{0};
public:
  RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
                          rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
                          AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc),
                                                        sync_pipe(_sync_pipe), key(_key),
                                                        mtime(_mtime), instance(_instance) {}
  int operate(const DoutPrefixProvider *dpp) override {
    reenter(this) {
      ldpp_dout(dpp, 0) << ": remove remote obj: z=" << sc->source_zone
                              << " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
      yield {
        instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
        string path =  instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
        ldpp_dout(dpp, 0) << "AWS: removing aws object at" << path << dendl;

        call(new RGWDeleteRESTResourceCR(sc->cct, target->conn.get(),
                                         sc->env->http_manager,
                                         path, nullptr /* params */));
      }
      if (retcode < 0) {
        return set_cr_error(retcode);
      }
      return set_cr_done();
    }
    return 0;
  }

};


class RGWAWSDataSyncModule: public RGWDataSyncModule {
  CephContext *cct;
  AWSSyncInstanceEnv instance;
public:
  RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) :
                  cct(_cct),
                  instance(_conf) {
  }

  void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
    instance.init(sc, instance_id);
  }

  ~RGWAWSDataSyncModule() {}

  RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
                            std::optional<uint64_t> versioned_epoch,
                            rgw_zone_set *zones_trace) override {
    ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
    return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
  }
  RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
                              rgw_zone_set *zones_trace) override {
    ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
    return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
  }
  RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                     rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
                                     rgw_zone_set *zones_trace) override {
    ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
                            << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
    return NULL;
  }
};

class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance {
  RGWAWSDataSyncModule data_handler;
public:
  RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {}
  RGWDataSyncModule *get_data_handler() override {
    return &data_handler;
  }
};

int RGWAWSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config,  RGWSyncModuleInstanceRef *instance){
  AWSSyncConfig conf;

  int r = conf.init(dpp, cct, config);
  if (r < 0) {
    return r;
  }

  instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
  return 0;
}
